feat: experimental Spark regex support via codegen dispatcher#4239
feat: experimental Spark regex support via codegen dispatcher#4239andygrove wants to merge 58 commits into
Conversation
Also fix CometArrayExpressionSuite compilation by qualifying the Spark udf() call, which was shadowed by the new org.apache.comet.udf package.
Implements a DataFusion PhysicalExpr that evaluates child expressions, exports the results as Arrow FFI arrays, calls CometUdfBridge.evaluate() via JNI, and imports the output array. Adds datafusion-comet-jni-bridge as a dependency of the spark-expr crate.
…UDF class via context classloader Wrap the JNI body in try/finally so input ValueVectors and the result vector are always closed, even when the UDF or arrow export throws. Resolve the CometUDF class through the thread context classloader so user-supplied UDF jars (added via spark.jars) are visible from the bridge.
…ns fall back to Spark When routing RLike through the JVM UDF, reject Literal(null) and patterns that fail Pattern.compile during planning. Both cases now produce withInfo + None, letting Spark evaluate the expression instead of crashing the executor task with PatternSyntaxException or NullPointerException.
Make comet_udf_bridge an Option in JVMClasses so a missing org.apache.comet.udf.CometUdfBridge class (e.g. shading dropped org.apache.comet.udf.*) no longer crashes executor JVM init. The JVM-UDF dispatch path returns a clear ExecutionError when the bridge is unavailable. Also clarify the FFI lifetime contract on the result import.
Replace string literals "rust"/"java" used for the regexp engine selector with named constants on CometConf. Tighten CometRLike.getSupportLevel so it only reports Compatible(None) when the pattern is a Literal, matching the actual constraint enforced by the convert path.
Literal-folded children no longer get expanded to batch-row count before crossing JNI; ColumnarValue::Scalar is materialized at length 1, avoiding an O(rows) copy of values that never vary across the batch. Document the contract on CometUDF: scalar inputs arrive as length-1 vectors, vector inputs at the batch row count, and the result must match the longest input.
|
That does raise the issue of how aggressively I should review these if they'll be dead code soon. There are number of optimizations we're leaving on the table, but maybe that doesn't matter right now. |
My argument for allowing this PR in would be that it adds all the regression tests and gives us an immediate performance win today. The refactor to update the UDF implementations to use the new framework should be a net reduction in line count I am assuming and easy to review? |
Likely not due to the codegen work I've done, but we'll cross that bridge when we come to it. If you mark this ready for review today I'll review it. |
These SQL test files exercise Java-only regex features (backreferences, lookahead, embedded flags) and previously relied on the JVM engine being the default. After the default reverted to the Rust engine, they need to explicitly opt in via spark.comet.exec.regexp.engine=java.
# Conflicts: # spark/src/main/scala/org/apache/comet/serde/strings.scala
|
@mbutrovich following on from our discussion about configs yesterday, I filed an issue where we can have that discussion. #4310 |
…nature PR apache#4306 added a numRows parameter to CometUDF.evaluate; merging main into this branch brought in the trait change but the six regexp UDF implementations still used the old single-argument signature, breaking comet-common compilation across all Spark profiles.
…ee-pr-4239 # Conflicts: # spark/src/main/scala/org/apache/comet/udf/RegExpExtractAllUDF.scala # spark/src/main/scala/org/apache/comet/udf/RegExpExtractUDF.scala # spark/src/main/scala/org/apache/comet/udf/RegExpInStrUDF.scala # spark/src/main/scala/org/apache/comet/udf/RegExpLikeUDF.scala # spark/src/main/scala/org/apache/comet/udf/RegExpReplaceUDF.scala # spark/src/main/scala/org/apache/comet/udf/StringSplitUDF.scala
Adds a master switch (default false) for the experimental JVM UDF framework so the Java regex engine cannot be activated without an explicit opt-in. With engine=java but jvmUdf.enabled=false, the six regex serdes return Unsupported with a message naming the master switch instead of silently using either path. Also extends Incompatible with optedInBy: Option[String] so a config (e.g. an engine selector) can serve as a per-expression incompatibility opt-in. Existing allowIncompatible flags continue to work; optedInBy is OR'd into the gating check in QueryPlanSerde. No existing serde uses optedInBy yet — this lays the foundation for the config simplification discussed in apache#4310.
|
@mbutrovich I pushed some config changes, inspired by our earlier discussions - let me know what you think of the direction |
Default engine is now `java` (routes through the JVM UDF when spark.comet.jvmUdf.enabled=true; falls back to Spark otherwise). Setting engine=rust runs the native Rust regex engine and is itself the opt-in for the semantic differences from Java regex — no separate allowIncompatible flag for the regex family. - Remove RegExp.isSupportedPattern (was a placeholder returning false) - Replace per-serde engine checks with a single RegexpRoute helper - Drop redundant *_rust_enabled.sql variants and migrate CometStringExpressionSuite split tests off the legacy per-class allowIncompatible flag
I posted this comment prematurely - I still had local changes. They are pushed now. |
Dual-impl regex serdes (rlike, regexp_replace, split) now return Incompatible(notes, optedInBy="spark.comet.exec.regexp.engine=rust") for the native rust path instead of Compatible. The standard QueryPlanSerde gating then sees engine=rust as the opt-in via the optedInBy mechanism introduced earlier, so the incompatibility is visible in EXPLAIN/logs rather than hidden behind a routing-helper short-circuit.
Drop redundant interpolators in COMET_REGEXP_ENGINE doc string and remove the redundant CometConf self-import in CometStringExpressionSuite to satisfy scalafix. Switch existing rlike/regexp_replace tests to opt in via COMET_REGEXP_ENGINE=rust now that the engine selector is the gate for the Rust path, and reformat regex.md via prettier.
Resolve conflicts in pr_build_{linux,macos}.yml by integrating both the
new codegen-suite additions from main and the CometRegExpJvmSuite from
the PR, dropping the obsolete standalone "sql" matrix entry that main
folded into the "spark" matrix.
Resolve CometConf.scala by retaining both COMET_JVM_UDF_ENABLED /
COMET_REGEXP_ENGINE from the PR and COMET_SCALA_UDF_CODEGEN_ENABLED
from main. The follow-up refactor drops COMET_JVM_UDF_ENABLED in favor
of COMET_SCALA_UDF_CODEGEN_ENABLED.
…of hand-written UDFs Replace the six hand-written `RegExp*UDF` / `StringSplitUDF` JVM UDF implementations with the Arrow-direct codegen dispatcher introduced in PR apache#4417 (`CometScalaUDF.emitJvmCodegenDispatch`). The dispatcher Janino-compiles Spark's own `doGenCode` for the expression, so the regex family inherits Spark-identical semantics with no per-expression glue code. Changes: - Delete `spark/src/main/scala/org/apache/comet/udf/RegExp*UDF.scala` and `StringSplitUDF.scala`. Their behavior is now provided by Spark's `doGenCode` running inside the dispatcher. - Rewrite the regex serdes in `strings.scala`. Expressions with no native Rust path (`RegExpExtract`, `RegExpExtractAll`, `RegExpInStr`) share a new `CometRegexpCodegenOnly` base; expressions with a native path (`RLike`, `RegExpReplace`, `StringSplit`) keep an explicit route table where the JVM arm now delegates to `CometScalaUDF.emitJvmCodegenDispatch`. - Drop the `spark.comet.jvmUdf.enabled` config. The codegen dispatcher already has its own master switch (`spark.comet.exec.scalaUDF.codegen.enabled`); gating the regex family on the same flag avoids two flags for the same path. `spark.comet.exec.regexp.engine` keeps the `java`/`rust` selector semantics, and `engine=java` now requires the codegen flag. - Revert the native Rust additions in `jvm_udf/mod.rs` and `jni-bridge/src/lib.rs`. The codegen dispatcher constructs Arrow output fields JVM-side via `CometBatchKernelCodegenOutput.toFfiArrowField`, so the list-vector field-name normalization cast is unnecessary. - Update `CometRegExpJvmSuite`, `CometRegExpBenchmark`, the regex SQL test fixtures, and the regex compatibility doc to reflect the new gating. Test plan: - `CometRegExpJvmSuite`: 45/45 pass (covers all six regex expressions through the codegen dispatcher). - `CometSqlFileTestSuite`: 289/289 pass. - `CometStringExpressionSuite`: 33/33 pass. - `CometCodegenSuite`: 60/60 pass. - `cargo clippy --all-targets --workspace -- -D warnings`: clean.
|
@mbutrovich As discussed, I refactored this PR to use codegen dispatch. |
The per-expression spark.comet.expression.regexp.allowIncompatible flag is no longer consulted by the regex family. Switch to engine=rust so the RLike serde reaches convertViaNativeRegex and emits the 'Only scalar regexp patterns are supported' fallback message the test asserts on.
Which issue does this PR close?
Part of the simplification discussed in #4310.
Rationale for this change
Add experimental support for all Spark regex expressions (
rlike,regexp_extract,regexp_extract_all,regexp_instr,regexp_replace,split) with fulljava.util.regexcompatibility by routing them through the Arrow-direct codegen dispatcher introduced in #4417. The dispatcher Janino-compiles Spark's owndoGenCodefor the expression, so the regex family inherits Spark-identical semantics with no per-expression glue code.The native Rust regex engine is potentially faster but cannot fully match Java regex semantics (backreferences, lookaround, embedded flags, etc.). Rather than expose users to two orthogonal axes (engine choice plus a per-expression
allowIncompatibleflag), this PR collapses to a singleengineselector.Configs
spark.comet.exec.regexp.enginein{rust, java}, defaultjavajava: route the regex expression through the codegen dispatcher so Spark's owndoGenCode(backed byjava.util.regex.Pattern) runs inside the Comet pipeline for full Spark semantics. Requiresspark.comet.exec.scalaUDF.codegen.enabled=true; otherwise the operator falls back to Spark with an explanatory message.rust: run the native DataFusion regex engine when an implementation exists. Setting this is itself the opt-in for the semantic differences from Java regex: no separateallowIncompatibleflag needed. Expressions without a native implementation (regexp_extract,regexp_extract_all,regexp_instr) fall back to Spark.Reuses the existing
spark.comet.exec.scalaUDF.codegen.enabled(defaultfalse) flag introduced for the codegen dispatcher. With pure defaults (engine=java,scalaUDF.codegen.enabled=false) all regex expressions fall back to Spark, matching today's effective default behavior.What changes are included in this PR?
RegexpRoutehelper instrings.scalathat each regex serde delegates to. It picks between the native Rust engine, the codegen dispatcher, and Spark fallback based onengineandscalaUDF.codegen.enabled.regexp_extract,regexp_extract_all,regexp_instr), introduce aCometRegexpCodegenOnlybase class. Each serde is a one-line subclass.rlike,regexp_replace,split), the JVM arm delegates toCometScalaUDF.emitJvmCodegenDispatch. The native arm is unchanged.Incompatible(notes, optedInBy="...engine=rust")so the standard gating inQueryPlanSerderecognizesengine=rustas the opt-in viaoptedInBy.SupportLevel.Incompatiblewith anoptedInBy: Option[String]field, plumbed through scalar- and aggregate-expression gating inQueryPlanSerde.spark.comet.exec.regexp.engineconfig inCometConf.RegExp.isSupportedPattern(was a placeholder always returningfalse).docs/source/user-guide/latest/compatibility/regex.md.How are these changes tested?
CometRegExpJvmSuite: 45 tests covering all six regex expressions withengine=javaand the codegen flag enabled.rlike_{java,rust}.sql,regexp_replace_{java,rust}.sql,split_{java,rust}.sql,regexp_extract.sql,regexp_extract_all.sql,regexp_instr.sql.CometStringExpressionSuite,CometSqlFileTestSuite, andCometCodegenSuitecontinue to pass;splittests migrated from the legacy per-classallowIncompatibleflag toengine=rust.Migration notes
rusttojava. Users on pure defaults see the same effective behavior (Spark fallback for regex), since today'sRegExp.isSupportedPatternalways returnsfalse.spark.comet.expression.regexp.allowIncompatible=trueto enable the rust path should switch tospark.comet.exec.regexp.engine=rust. The per-expression flag is no longer consulted by the regex family.spark.comet.expression.StringSplit.allowIncompatible=trueshould likewise switch tospark.comet.exec.regexp.engine=rust.